home *** CD-ROM | disk | FTP | other *** search
- # Source Generated with Decompyle++
- # File: in.pyc (Python 2.6)
-
- import warnings
- warnings.warn('This Store implementation is still being debugged. It is currently running out of db lockers after adding around 2k triples.')
- from rdflib.store import Store, VALID_STORE, CORRUPTED_STORE, NO_STORE, UNKNOWN
- from rdflib.URIRef import URIRef
- from bsddb import db
- from os import mkdir, rmdir, makedirs
- from os.path import exists, abspath, join
- from urllib import pathname2url
- from threading import Thread
- from time import sleep, time
- import logging
- SUPPORT_MULTIPLE_STORE_ENVIRON = False
- _logger = logging.getLogger(__name__)
-
- class BerkeleyDB(Store):
- """
- A transaction-capable BerkeleyDB implementation
- The major difference are:
- - a dbTxn attribute which is the transaction object used for all bsddb databases
- - All operations (put,delete,get) take the dbTxn instance
- - The actual directory used for the bsddb persistence is the name of the identifier as a subdirectory of the 'path'
-
- """
- context_aware = True
- formula_aware = True
- transaction_aware = True
-
- def __init__(self, configuration = None, identifier = None):
- self._BerkeleyDB__open = False
- if not identifier or identifier:
- pass
- self._BerkeleyDB__identifier = 'home'
- super(BerkeleyDB, self).__init__(configuration)
- self.configuration = configuration
- self._loads = self.node_pickler.loads
- self._dumps = self.node_pickler.dumps
- self._BerkeleyDB__dbTxn = None
-
-
- def __get_identifier(self):
- return self._BerkeleyDB__identifier
-
- identifier = property(__get_identifier)
-
- def destroy(self, configuration):
- '''
- Destroy the underlying bsddb persistence for this store
- '''
- if SUPPORT_MULTIPLE_STORE_ENVIRON:
- fullDir = join(configuration, self.identifier)
- else:
- fullDir = configuration
- if exists(configuration):
- self.close()
- db.DBEnv().remove(fullDir, db.DB_FORCE)
-
-
-
- def open(self, path, create = True):
- if self._BerkeleyDB__open:
- return None
- homeDir = path
- if SUPPORT_MULTIPLE_STORE_ENVIRON:
- fullDir = join(homeDir, self.identifier)
- else:
- fullDir = homeDir
- envsetflags = db.DB_CDB_ALLDB
- envflags = db.DB_INIT_MPOOL | db.DB_INIT_LOCK | db.DB_THREAD | db.DB_INIT_TXN | db.DB_RECOVER
- if not exists(fullDir):
- if create == True:
- makedirs(fullDir)
- self.create(path)
- else:
- return NO_STORE
- create == True
- if self._BerkeleyDB__identifier is None:
- self._BerkeleyDB__identifier = URIRef(pathname2url(abspath(fullDir)))
-
- self.db_env = db_env = db.DBEnv()
- db_env.set_cachesize(0, 52428800)
- db_env.open(fullDir, envflags | db.DB_CREATE, 0)
- self.dbTxn = db_env.txn_begin()
- self._BerkeleyDB__open = True
- dbname = None
- dbtype = db.DB_BTREE
- dbopenflags = db.DB_THREAD
- dbmode = 432
- dbsetflags = 0
- self._BerkeleyDB__indicies = [
- None] * 3
- self._BerkeleyDB__indicies_info = [
- None] * 3
- for i in xrange(0, 3):
- index_name = to_key_func(i)(('s', 'p', 'o'), 'c')
- index = db.DB(db_env)
- index.set_flags(dbsetflags)
- index.open(index_name, dbname, dbtype, dbopenflags | db.DB_CREATE, dbmode, txn = self.dbTxn)
- self._BerkeleyDB__indicies[i] = index
- self._BerkeleyDB__indicies_info[i] = (index, to_key_func(i), from_key_func(i))
-
- lookup = { }
- for i in xrange(0, 8):
- results = []
- for start in xrange(0, 3):
- score = 1
- len = 0
- for j in xrange(start, start + 3):
- if i & 1 << j % 3:
- score = score << 1
- len += 1
- continue
-
- tie_break = 2 - start
- results.append(((score, tie_break), start, len))
-
- results.sort()
- (score, start, len) = results[-1]
-
- def get_prefix_func(start, end):
-
- def get_prefix(triple, context):
- if context is None:
- yield ''
- else:
- yield context
- i = start
- while i < end:
- yield triple[i % 3]
- i += 1
- yield ''
-
- return get_prefix
-
- lookup[i] = (self._BerkeleyDB__indicies[start], get_prefix_func(start, start + len), from_key_func(start), results_from_key_func(start, self._from_string))
-
- self._BerkeleyDB__lookup_dict = lookup
- self._BerkeleyDB__contexts = db.DB(db_env)
- self._BerkeleyDB__contexts.set_flags(dbsetflags)
- self._BerkeleyDB__contexts.open('contexts', dbname, dbtype, dbopenflags | db.DB_CREATE, dbmode, txn = self.dbTxn)
- self._BerkeleyDB__namespace = db.DB(db_env)
- self._BerkeleyDB__namespace.set_flags(dbsetflags)
- self._BerkeleyDB__namespace.open('namespace', dbname, dbtype, dbopenflags | db.DB_CREATE, dbmode, txn = self.dbTxn)
- self._BerkeleyDB__prefix = db.DB(db_env)
- self._BerkeleyDB__prefix.set_flags(dbsetflags)
- self._BerkeleyDB__prefix.open('prefix', dbname, dbtype, dbopenflags | db.DB_CREATE, dbmode, txn = self.dbTxn)
- self._BerkeleyDB__i2k = db.DB(db_env)
- self._BerkeleyDB__i2k.set_flags(dbsetflags)
- self._BerkeleyDB__i2k.open('i2k', dbname, db.DB_HASH, dbopenflags | db.DB_CREATE, dbmode, txn = self.dbTxn)
- self._BerkeleyDB__needs_sync = False
- t = Thread(target = self._BerkeleyDB__sync_run)
- t.setDaemon(True)
- t.start()
- self._BerkeleyDB__sync_thread = t
- return VALID_STORE
-
-
- def __sync_run(self):
- (min_seconds, max_seconds) = (10, 300)
- while self._BerkeleyDB__open:
- if self._BerkeleyDB__needs_sync:
- t0 = t1 = time()
- self._BerkeleyDB__needs_sync = False
- while self._BerkeleyDB__open:
- sleep(0.1)
- if self._BerkeleyDB__needs_sync:
- t1 = time()
- self._BerkeleyDB__needs_sync = False
-
- if time() - t1 > min_seconds or time() - t0 > max_seconds:
- self._BerkeleyDB__needs_sync = False
- _logger.debug('sync')
- self.sync()
- break
- continue
- continue
- sleep(1)
-
-
- def sync(self):
- if self._BerkeleyDB__open:
- for i in self._BerkeleyDB__indicies:
- i.sync()
-
- self._BerkeleyDB__contexts.sync()
- self._BerkeleyDB__namespace.sync()
- self._BerkeleyDB__prefix.sync()
- self._BerkeleyDB__i2k.sync()
-
-
-
- def commit(self):
- '''
- Bsddb tx objects cannot be reused after commit
- '''
- if self.dbTxn:
- _logger.debug('commiting')
- self.dbTxn.commit(0)
- self.dbTxn = self.db_env.txn_begin()
- else:
- _logger.warning('No transaction to commit')
-
-
- def rollback(self):
- '''
- Bsddb tx objects cannot be reused after commit
- '''
- if self.dbTxn is not None:
- _logger.debug('rollingback')
- self.dbTxn.abort()
- self.dbTxn = None
- else:
- _logger.warning('No transaction to rollback')
-
-
- def __del__(self):
- """
- Redirects python's native garbage collection into Store.close
- """
- self.close()
-
-
- def close(self, commit_pending_transaction = False):
- '''
- Properly handles transactions explicitely (with parameter) or by default
- '''
- if not self._BerkeleyDB__open:
- return None
- if self.dbTxn:
- if not commit_pending_transaction:
- self.rollback()
- else:
- self.commit()
- self.dbTxn.abort()
-
- self._BerkeleyDB__open = False
- self._BerkeleyDB__sync_thread.join()
- for i in self._BerkeleyDB__indicies:
- i.close()
-
- self._BerkeleyDB__contexts.close()
- self._BerkeleyDB__namespace.close()
- self._BerkeleyDB__prefix.close()
- self._BerkeleyDB__i2k.close()
- self.db_env.close()
-
-
- def add(self, .1, context, quoted = False):
- ''' Add a triple to the store of triples.
- '''
- (subject, predicate, object_) = .1
- if not self._BerkeleyDB__open:
- raise AssertionError, 'The Store must be open.'
- if not context != self:
- raise AssertionError, 'Can not add triple directly to store'
- Store.add(self, (subject, predicate, object_), context, quoted)
- _to_string = self._to_string
- s = _to_string(subject)
- p = _to_string(predicate)
- o = _to_string(object_)
- c = _to_string(context)
- (cspo, cpos, cosp) = self._BerkeleyDB__indicies
- value = cspo.get('%s^%s^%s^%s^' % (c, s, p, o), txn = self.dbTxn)
- if value is None:
- self._BerkeleyDB__contexts.put(c, '', self.dbTxn)
- if not cspo.get('%s^%s^%s^%s^' % ('', s, p, o), txn = self.dbTxn):
- pass
- contexts_value = ''
- contexts = set(contexts_value.split('^'))
- contexts.add(c)
- contexts_value = '^'.join(contexts)
- if not contexts_value != None:
- raise AssertionError
- cspo.put('%s^%s^%s^%s^' % (c, s, p, o), '', self.dbTxn)
- cpos.put('%s^%s^%s^%s^' % (c, p, o, s), '', self.dbTxn)
- cosp.put('%s^%s^%s^%s^' % (c, o, s, p), '', self.dbTxn)
- self._BerkeleyDB__needs_sync = True
-
-
-
- def __remove(self, .1, c, quoted = False):
- (s, p, o) = .1
- (cspo, cpos, cosp) = self._BerkeleyDB__indicies
- if not cspo.get('^'.join(('', s, p, o, '')), txn = self.dbTxn):
- pass
- contexts_value = ''
- contexts = set(contexts_value.split('^'))
- contexts.discard(c)
- contexts_value = '^'.join(contexts)
- for i, _to_key, _from_key in self._BerkeleyDB__indicies_info:
- i.delete(_to_key((s, p, o), c), txn = self.dbTxn)
-
- if not quoted:
- if contexts_value:
- for i, _to_key, _from_key in self._BerkeleyDB__indicies_info:
- i.put(_to_key((s, p, o), ''), contexts_value, self.dbTxn)
-
- else:
- for i, _to_key, _from_key in self._BerkeleyDB__indicies_info:
-
- try:
- i.delete(_to_key((s, p, o), ''), txn = self.dbTxn)
- continue
- except db.DBNotFoundError:
- e = None
- continue
-
-
-
-
-
-
- def remove(self, .1, context):
- (subject, predicate, object_) = .1
- if not self._BerkeleyDB__open:
- raise AssertionError, 'The Store must be open.'
- Store.remove(self, (subject, predicate, object_), context)
- _to_string = self._to_string
- if context is not None:
- if context == self:
- context = None
-
-
- if subject is not None and predicate is not None and object_ is not None and context is not None:
- s = _to_string(subject)
- p = _to_string(predicate)
- o = _to_string(object_)
- c = _to_string(context)
- value = self._BerkeleyDB__indicies[0].get('%s^%s^%s^%s^' % (c, s, p, o), txn = self.dbTxn)
- if value is not None:
- self._BerkeleyDB__remove((s, p, o), c)
- self._BerkeleyDB__needs_sync = True
-
- else:
- (cspo, cpos, cosp) = self._BerkeleyDB__indicies
- (index, prefix, from_key, results_from_key) = self._BerkeleyDB__lookup((subject, predicate, object_), context)
- cursor = index.cursor(txn = self.dbTxn)
-
- try:
- current = cursor.set_range(prefix)
- needs_sync = True
- except db.DBNotFoundError:
- current = None
- needs_sync = False
-
- cursor.close()
- while current:
- (key, value) = current
- cursor = index.cursor(txn = self.dbTxn)
-
- try:
- cursor.set_range(key)
- current = cursor.next()
- except db.DBNotFoundError:
- current = None
-
- cursor.close()
- if key.startswith(prefix):
- (c, s, p, o) = from_key(key)
- if context is None:
- if not index.get(key, txn = self.dbTxn):
- pass
- contexts_value = ''
- contexts = set(contexts_value.split('^'))
- contexts.add('')
- for c in contexts:
- for i, _to_key, _ in self._BerkeleyDB__indicies_info:
- i.delete(_to_key((s, p, o), c), txn = self.dbTxn)
-
-
- else:
- self._BerkeleyDB__remove((s, p, o), c)
- context is None
- break
- if context is not None:
- if subject is None and predicate is None and object_ is None:
-
- try:
- self._BerkeleyDB__contexts.delete(_to_string(context), txn = self.dbTxn)
- except db.DBNotFoundError:
- e = None
- except:
- None<EXCEPTION MATCH>db.DBNotFoundError
-
-
- None<EXCEPTION MATCH>db.DBNotFoundError
-
- self._BerkeleyDB__needs_sync = needs_sync
-
-
- def triples(self, .1, context = None):
- '''A generator over all the triples matching '''
- (subject, predicate, object_) = .1
- if not self._BerkeleyDB__open:
- raise AssertionError, 'The Store must be open.'
- if context is not None:
- if context == self:
- context = None
-
-
- _from_string = self._from_string
- (index, prefix, from_key, results_from_key) = self._BerkeleyDB__lookup((subject, predicate, object_), context)
- cursor = index.cursor(txn = self.dbTxn)
-
- try:
- current = cursor.set_range(prefix)
- except db.DBNotFoundError:
- current = None
-
- cursor.close()
- while current:
- (key, value) = current
- cursor = index.cursor(txn = self.dbTxn)
-
- try:
- cursor.set_range(key)
- current = cursor.next()
- except db.DBNotFoundError:
- current = None
-
- cursor.close()
- if key and key.startswith(prefix):
- contexts_value = index.get(key, txn = self.dbTxn)
- yield results_from_key(key, subject, predicate, object_, contexts_value)
- continue
- break
-
-
- def __len__(self, context = None):
- if not self._BerkeleyDB__open:
- raise AssertionError, 'The Store must be open.'
- if context is not None:
- if context == self:
- context = None
-
-
- if context is None:
- prefix = '^'
- else:
- prefix = '%s^' % self._to_string(context)
- index = self._BerkeleyDB__indicies[0]
- cursor = index.cursor(txn = self.dbTxn)
- current = cursor.set_range(prefix)
- count = 0
- while current:
- (key, value) = current
- if key.startswith(prefix):
- count += 1
- current = cursor.next()
- continue
- break
- cursor.close()
- return count
-
-
- def bind(self, prefix, namespace):
- prefix = prefix.encode('utf-8')
- namespace = namespace.encode('utf-8')
- bound_prefix = self._BerkeleyDB__prefix.get(namespace, txn = self.dbTxn)
- if bound_prefix:
- self._BerkeleyDB__namespace.delete(bound_prefix, txn = self.dbTxn)
-
- self._BerkeleyDB__prefix.put(namespace, prefix, self.dbTxn)
- self._BerkeleyDB__namespace.put(prefix, namespace, self.dbTxn)
-
-
- def namespace(self, prefix):
- prefix = prefix.encode('utf-8')
- return self._BerkeleyDB__namespace.get(prefix, None, txn = self.dbTxn)
-
-
- def prefix(self, namespace):
- namespace = namespace.encode('utf-8')
- return self._BerkeleyDB__prefix.get(namespace, None, txn = self.dbTxn)
-
-
- def namespaces(self):
- cursor = self._BerkeleyDB__namespace.cursor(txn = self.dbTxn)
- results = []
- current = cursor.first()
- while current:
- (prefix, namespace) = current
- results.append((prefix, namespace))
- current = cursor.next()
- cursor.close()
- for prefix, namespace in results:
- yield (prefix, URIRef(namespace))
-
-
-
- def contexts(self, triple = None):
- _from_string = self._from_string
- _to_string = self._to_string
- if triple:
- (s, p, o) = triple
- s = _to_string(s)
- p = _to_string(p)
- o = _to_string(o)
- contexts = self._BerkeleyDB__indicies[0].get('%s^%s^%s^%s^' % ('', s, p, o), txn = self.dbTxn)
- if contexts:
- for c in contexts.split('^'):
- if c:
- yield _from_string(c)
- continue
-
-
- else:
- index = self._BerkeleyDB__contexts
- cursor = index.cursor(txn = self.dbTxn)
- current = cursor.first()
- cursor.close()
- while current:
- (key, value) = current
- context = _from_string(key)
- yield context
- cursor = index.cursor(txn = self.dbTxn)
-
- try:
- cursor.set_range(key)
- current = cursor.next()
- except db.DBNotFoundError:
- current = None
-
- cursor.close()
-
-
- def _from_string(self, i):
- k = self._BerkeleyDB__i2k.get(i, txn = self.dbTxn)
- return self._loads(k)
-
-
- def _to_string(self, term):
- '''
- i2k: hashString -> pickledTerm
-
- i2k basically stores the reverse lookup of the MD5 hash of the term
-
- '''
- if not term is not None:
- raise AssertionError
- i = term.md5_term_hash()
- k = self._BerkeleyDB__i2k.get(i, txn = self.dbTxn)
- if k is None:
- self._BerkeleyDB__i2k.put(i, self._dumps(term), txn = self.dbTxn)
-
- return i
-
-
- def __lookup(self, .1, context):
- (subject, predicate, object_) = .1
- _to_string = self._to_string
- if context is not None:
- context = _to_string(context)
-
- i = 0
- if subject is not None:
- i += 1
- subject = _to_string(subject)
-
- if predicate is not None:
- i += 2
- predicate = _to_string(predicate)
-
- if object_ is not None:
- i += 4
- object_ = _to_string(object_)
-
- (index, prefix_func, from_key, results_from_key) = self._BerkeleyDB__lookup_dict[i]
- prefix = '^'.join(prefix_func((subject, predicate, object_), context))
- return (index, prefix, from_key, results_from_key)
-
-
-
- def to_key_func(i):
-
- def to_key(triple, context):
- '''Takes a string; returns key'''
- return '^'.join((context, triple[i % 3], triple[(i + 1) % 3], triple[(i + 2) % 3], ''))
-
- return to_key
-
-
- def from_key_func(i):
-
- def from_key(key):
- '''Takes a key; returns string'''
- parts = key.split('^')
- return (parts[0], parts[((3 - i) + 0) % 3 + 1], parts[((3 - i) + 1) % 3 + 1], parts[((3 - i) + 2) % 3 + 1])
-
- return from_key
-
-
- def results_from_key_func(i, from_string):
-
- def from_key(key, subject, predicate, object_, contexts_value):
- '''Takes a key and subject, predicate, object; returns tuple for yield'''
- parts = key.split('^')
- if subject is None:
- s = from_string(parts[((3 - i) + 0) % 3 + 1])
- else:
- s = subject
- if predicate is None:
- p = from_string(parts[((3 - i) + 1) % 3 + 1])
- else:
- p = predicate
- if object_ is None:
- o = from_string(parts[((3 - i) + 2) % 3 + 1])
- else:
- o = object_
- return (((s, p, o),), (lambda .0: for c in .0:
- if c:
- from_string(c)continue)(contexts_value.split('^')))
-
- return from_key
-
-
- def readable_index(i):
- (s, p, o) = '???'
- if i & 1:
- s = 's'
-
- if i & 2:
- p = 'p'
-
- if i & 4:
- o = 'o'
-
- return '%s,%s,%s' % (s, p, o)
-
-